37972f7d5f6ba11ee3817067a34382d0d9f8ff3d,src/main/java/com/continuuity/gateway/accessor/DatasetRestHandler.java,DatasetRestHandler,handleTableOperation,#MessageEvent#HttpRequest#MetricsHelper#LinkedList#Map#OperationContext#,249

Before Change



    // optional parameter encoding
    String encoding = getEncodingParameter(message, request, helper, parameters);
    if (encoding == null) { // error
      return;
    }

    // make sure the operations and parameters are valid
    HttpMethod method = request.getMethod();
    if (HttpMethod.GET.equals(method)) {
      // can be either a list or a read, list would have had ?op=list
      if (operation == TableOp.List) {
        if (row != null) {
          respondBadRequest(message, request, helper, "list operation cannot have row key");
          return;
        } else {
          respondBadRequest(message, request, helper,
                            "list operation not implemented", HttpResponseStatus.NOT_IMPLEMENTED);
          return;
        }
      }
      // make sure no other operation was given with ?op=
      if (operation != null) {
        respondBadRequest(message, request, helper, "invalid operation for method GET");
        return;
      }
      // must be a read, requires a row
      if (row == null) {
        respondBadRequest(message, request, helper, "read must have a row key");
        return;
      }
      if (columns != null && !columns.isEmpty() && (start != null || stop != null)) {
        respondBadRequest(message, request, helper, "read can only specify columns or range");
        return;
      }
      operation = TableOp.Read;

    } else if (HttpMethod.DELETE.equals(method)) {
      // make sure no operation was given with ?op=
      if (operation != null) {
        respondBadRequest(message, request, helper, "invalid operation for method GET");
        return;
      }
      // must be a delete, requires a row
      if (row == null) {
        respondBadRequest(message, request, helper, "delete must have a row key");
        return;
      }
      if (columns == null || columns.isEmpty()) {
        respondBadRequest(message, request, helper, "delete must have columns");
        return;
      }
      operation = TableOp.Delete;

    } else if (HttpMethod.PUT.equals(method)) {
      // make sure no operation was given with ?op=
      if (operation != null) {
        respondBadRequest(message, request, helper, "invalid operation for method PUT");
        return;
      }
      // must be a write, requires a row
      if (row == null) {
        operation = TableOp.Create;
      } else {
        operation = TableOp.Write;
      }

    } else if (HttpMethod.POST.equals(method)) {
      // make sure no operation was given with ?op=
      if (operation == null) {
        respondBadRequest(message, request, helper, "missing operation for method POST");
        return;
      }
      if (operation != TableOp.Increment) {
        respondBadRequest(message, request, helper, "invalid operation for method POST");
        return;
      }
      // must be increment, requires a row
      if (row == null) {
        respondBadRequest(message, request, helper, "increment must have a row key");
        return;
      }
      operation = TableOp.Increment;
    }

    Type stringMapType = new TypeToken<Map<String, String>>() {}.getType();
    // Type longMapType = new TypeToken<Map<String, Long>>() {}.getType();

    // for operations write and increment, there must be a JSON string in the body
    Map<String, String> valueMap = null;
    // Map<String, Long> longMap = null;
    try {
      if (operation == TableOp.Increment || operation == TableOp.Write) {
        InputStreamReader reader = new InputStreamReader(
          new ChannelBufferInputStream(request.getContent()), Charsets.UTF_8);
        if (operation == TableOp.Write) {
          valueMap = new Gson().fromJson(reader, stringMapType);
        } else {
          // does not seem to work, Gson returns Map<String,String>
          // longMap = new Gson().fromJson(reader, longMapType);
          valueMap = new Gson().fromJson(reader, stringMapType);
        }
      }
    } catch (Exception e) {
      // failed to parse json, that is a bad request
      respondBadRequest(message, request, helper, "failed to read body as json: " + e.getMessage());
      return;
    }

    if (operation.equals(TableOp.Create)) {
      DataSetSpecification spec = new Table(tableName).configure();
      Dataset ds = new Dataset(spec.getName());
      ds.setName(spec.getName());
      ds.setType(spec.getType());
      ds.setSpecification(new Gson().toJson(spec));
      try {
        this.accessor.getMetadataService().assertDataset(new Account(opContext.getAccount()), ds);
      } catch (MetadataServiceException e) {
        respondBadRequest(message, request, helper, "table already exists", HttpResponseStatus.CONFLICT);
        return;
      } catch (TException e) {
        helper.finish(Error);
        LOG.error("Thrift error creating table: " + e.getMessage(), e);
        respondError(message.getChannel(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
        return;
      }
      respondSuccess(message.getChannel(), request);
      helper.finish(Success);
      return;
    }

    // make sure the table exists and instantiate dataset
    Table table;
    try {
      table = this.accessor.getInstantiator().getDataSet(tableName, opContext);
    } catch (Exception e) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Cannot instantiate requested table '" + tableName + "' (" +
                    e.getMessage() + ") for URI '" + request.getUri() + "'");
      }
      helper.finish(BadRequest);
      respondError(message.getChannel(), HttpResponseStatus.NOT_FOUND);
      return;
    }

    // try to convert the row ket to bytes, using the given encoding
    byte[] rowKey;
    try {
      rowKey = row == null ? null : Util.decodeBytes(row, encoding);
    } catch (Exception e) {
      respondBadRequest(message, request, helper, "error decoding row key", e);
      return;
    }

    if (operation.equals(TableOp.List)) {
      // TODO not implemented
    } else if (operation.equals(TableOp.Read)) {
      Read read;
      try {
        if (columns == null || columns.isEmpty()) {
          // column range
          byte[] startCol = Util.decodeBytes(start, encoding);
          byte[] stopCol = Util.decodeBytes(stop, encoding);
          read = new Read(rowKey, startCol, stopCol, limit == null ? -1 : limit);
        } else {
          byte[][] cols = new byte[columns.size()][];

After Change


        if (columns == null || columns.isEmpty()) {
          // column range
          byte[] startCol = start == null ? null : Util.decodeBinary(start, encoding);
          byte[] stopCol = stop == null ? null : Util.decodeBinary(stop, encoding);
          read = new Read(rowKey, startCol, stopCol, limit == null ? -1 : limit);
        } else {
          byte[][] cols = new byte[columns.size()][];